package org.adam2.sariel.dao.impl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;

import java.io.IOException;
import java.util.List;

/**
 * 抽象dao类
 */
public abstract class AbstractDao {
    @Autowired
    public HbaseTemplate hbaseTemplate;

    /**
     * 在运行mapreduce时，spring依赖注入无法使用，只能用硬编码的方式创建对象
     */ {
        if (null == hbaseTemplate) {
            ApplicationContext ctx = new ClassPathXmlApplicationContext("config/hbase-spring.xml");
            hbaseTemplate = (HbaseTemplate) ctx.getBean("hbaseTemplate");
        }
    }

    /**
     * 插入Board对象
     *
     * @param rowName
     * @param familyName
     * @param qualifier
     * @param value
     */
    public void putBoard(String rowName, String familyName, String qualifier, byte[] value) {
        hbaseTemplate.put(org.adam2.sariel.constant.TableName.BOARD, rowName, familyName, qualifier, value);
    }

    /**
     * 根据表格名称tableName，批量加载数据到HBase
     *
     * @param mutationList
     * @param tableName
     */
    public void batchPut(List<Mutation> mutationList, String tableName) {
        Configuration config = hbaseTemplate.getConfiguration();
        Connection connection = null;
        BufferedMutator table = null;
        try {
            connection = ConnectionFactory.createConnection(config);
            table = connection.getBufferedMutator(TableName.valueOf(tableName));
            table.mutate(mutationList);
            table.flush();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                table.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 根据rowName, familyName, qualifier返回board表中的一个value值
     * @param rowName
     * @param familyName
     * @param qualifier
     * @return
     */
    public String getValueByRowNameAndFamilyAndQualifier(String rowName, String familyName, String qualifier) {
        return hbaseTemplate.get(org.adam2.sariel.constant.TableName.BOARD, rowName, familyName, qualifier, new RowMapper<String>() {
            public String mapRow(Result result, int rowNum) throws Exception {
                List<Cell> ceList = result.listCells();
                String res = "";
                if (ceList != null && ceList.size() > 0) {
                    for (Cell cell : ceList) {
                        res = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                    }
                }
                return res;
            }
        });
    }
}
